rm(list=ls())
library(groundhog)
groundhog.library('taRifx', '2023-04-22')
library(dplyr)
library(haven)
library(fst)
library(taRifx)
library(doParallel)

cores <- 6

year_mat <- matrix(1990:2013, nrow = cores)
# year_mat <- matrix(seq(1993, 2013, 4), nrow = cores) 

cl <- makePSOCKcluster(cores)
clusterSetRNGStream(cl)
registerDoParallel(cl)

m <- foreach(y = 1:cores, .packages=c('dplyr', 'haven', 'fst', 'taRifx')) %dopar% {
  data_res <- NULL 

    # Find everyone ever running ----------------------------------------------
  
  candidates_2013 <-
    read_sas(paste("kv2013", "_recodes_pnr_afid.sas7bdat", sep = "")) 
  
  candidates_2009 <-
    read_sas(paste("kv2009", "_recodes_pnr_afid.sas7bdat", sep = "")) 
  
  candidates_2005 <-
    read_sas(paste("kv2005", "_recodes_pnr_afid.sas7bdat", sep = "")) 
  
  candidates_2001 <-
    read_sas(paste("kv2001", "_recodes_pnr_afid.sas7bdat", sep = "")) 
  
  candidates_1997 <-
    read_sas(paste("kv1997", "_recodes_pnr_afid.sas7bdat", sep = "")) 
  
  candidates_1993 <-
    read_sas(paste("kv1993", "_recodes_pnr_afid.sas7bdat", sep = "")) 
  
  candidates <- 
    unique(c(candidates_1993$PNR, 
             candidates_1997$PNR, 
             candidates_2001$PNR, 
             candidates_2005$PNR, 
             candidates_2009$PNR, 
             candidates_2013$PNR))
  years <- year_mat[y, ]
  

    
    # Build dataset ------------------------------------------------------
    
    
    for(k in  years ){
      #load population data as of Jan 1, following year
      year <- years[years == k]
      
      bef <- 
        read.fst(paste("bef", year + 1, ".fst", sep = ""))
      
      ## Some are assigned to very small municipalities in old data frames.
      ## New municpality codes even though before merger
      ## exclude if 30 or less with municipality code
      
      # subset to adults
      # and merge on candidates 
      data <-
        bef %>% 
        filter(ALDER > 17) %>%
        select(c("PNR", "KOM", "ALDER", "IE_TYPE", "KOEN")) 
      ndata <- nrow(data)
      
      small_kom_numbers <- 
        names(table(data$KOM)[table(data$KOM) <= 30])
      
      data <- 
        data %>%
        filter(! KOM %in% small_kom_numbers)
      ndata <- nrow(data)
      
      # load education as of Jan 1 in the election year 
      
      udda <- 
        read.fst(paste("udda", year, ".fst", sep = ""))
      
      data <- 
        left_join(data, udda, by = "PNR") %>%
        filter(!is.na(KOEN)) 
      if(ndata != nrow(data)) stop("3")
      
      # load income. 
      inc <-
        read.fst(paste("ind", year, ".fst", sep = "")) 
      
      inc <-
        inc %>%
        group_by(PNR) %>%
        summarise(income = sum(PERINDKIALT_13))
      
      print("income")
      
      data <- 
        left_join(data, inc, by = "PNR") %>%
        filter(!is.na(KOEN)) 
      if(ndata != nrow(data)) stop("4")
      
      if(year >= 2000){
        # Load sector
        sector <- 
          read.fst(paste("ras", year, ".fst", sep = "")) 
        
        # Load code for recoding sector
        source("recode_ras.r")
        
        # Recode sector
        sector$sector <- 
          ras_recode(sector) 
        
        se_bu <- sector
        da_bu <- data
        
        if (year < 2008){
          
          set.seed(020518)
          
          # Merg sector on data. Sample one where more than one sector
          sector_var <- 
            names(sector)
          sector <- 
            left_join(data, sector, by = "PNR") %>%
            filter(!is.na(KOEN)) %>%
            select(sector_var)
          
          sector_1 <- 
            sector %>%
            filter(!is.na(sector)) %>%
            group_by(PNR) %>% 
            summarise(count = n())
          
          sector_unique <- 
            left_join(sector, sector_1, by ="PNR") %>%
            filter(count == 1 & !is.na(sector))
          
          
          sector_2 <- 
            left_join(sector, sector_1, by ="PNR") %>%
            filter(count > 1 & !is.na(sector))
          
          sector_2$random <- rnorm(n = nrow(sector_2))
          
          sector_2 <-
            sector_2 %>%
            group_by(PNR) %>% 
            mutate(keep = random == min(random)) %>%
            ungroup() %>%
            filter(keep == 1) 
          
          sector_2$random <- NULL
          sector_2$keep   <- NULL
          
          sector_joint <- 
            rbind(sector_unique,
                  sector_2)
          
          sector_joint$count <- NULL
          
          sector_missing <- 
            sector %>%
            filter(is.na(sector)) %>%
            filter(! PNR %in% sector_joint$PNR)
          
          sector_missing$random <- rnorm(n = nrow(sector_missing))
          
          sector_missing <- 
            sector_missing %>%
            group_by(PNR) %>% 
            mutate(keep = random == min(random)) %>%
            ungroup() %>%
            filter(keep == 1) 
          
          sector_missing$random <- NULL
          sector_missing$keep   <- NULL
          
          sector_joint <- 
            rbind(sector_joint,
                  sector_missing)  
          
          sector <- 
            sector_joint %>%
            select(c("PNR", "sector"))
          
          data <- 
            left_join(data, sector, by = "PNR") %>% 
            filter(!is.na(KOEN)) 
          
          
        }
        
        
        
        if(ndata != nrow(data)) stop("5")

        if (year >= 2008){
          
          # Some PNRs have multiple entries 
          # Define main sector as the sector in which most hours were worked 
          sector_hours <- 
            sector %>% 
            filter(!is.na(LOENTIMER)) %>% 
            group_by(PNR) %>% 
            summarise(max_hours = max(LOENTIMER)) 
          
          sector_1 <- 
            left_join(sector, sector_hours, by = "PNR") %>%
            filter(LOENTIMER == max_hours) %>%
            group_by(PNR) %>% 
            summarise(count = n())
          
          # Subset to those with uniquely identified working sector based on most hours  worked
          sector_unique_max <- 
            left_join(sector, sector_1, by = "PNR") %>% 
            left_join(., sector_hours, by = "PNR") %>% 
            filter(count == 1 & LOENTIMER == max_hours)
          
          # Set seed for sampling sector where no clear main sector is defined
          set.seed(020518)
          
          # Subset to those with multiple occupations tied for main sector. Sample randomly one observation
          sector_dupli_max <- 
            left_join(sector, sector_1, by = "PNR") %>% 
            left_join(., sector_hours, by = "PNR") %>% 
            filter(count > 1 & LOENTIMER == max_hours) %>%
            group_by(PNR) %>% 
            sample_n(1) %>%
            ungroup()
          
          # Subset to those with NA for working hours
          sector_2 <- 
            left_join(sector, sector_hours, by = "PNR") %>%
            filter(is.na(max_hours)) %>%
            group_by(PNR) %>% 
            summarise(count = n())
          
          # subset to those with entries in only one sector (which is NA hours)
          sector_unique_no_max <- 
            left_join(sector, sector_2, by = "PNR") %>% 
            left_join(., sector_hours, by = "PNR") %>% 
            filter(count == 1 & is.na(max_hours))
          
          # subset to those with entries in multiple sectors (which is NA for hours). Sample one
          sector_dupli_no_max <- 
            left_join(sector, sector_2, by = "PNR") %>% 
            left_join(., sector_hours, by = "PNR") %>% 
            filter(count > 1 & is.na(max_hours)) %>%
            group_by(PNR) %>% 
            sample_n(1) %>%
            ungroup()
          
          # Combine for subset to one frame
          sector <- 
            rbind(sector_dupli_max,
                  sector_dupli_no_max,
                  sector_unique_max,
                  sector_unique_no_max) %>%
            select("PNR", "sector")
          
          data <- 
            left_join(data, sector, by = "PNR") %>% 
            filter(!is.na(KOEN)) 
          if(ndata != nrow(data)) stop("5")  
        }
        
      }
      
      
      ## recode variables and keep non-missing income only
      if(year < 2000){
        data <-
          data %>%
          filter(!is.na(income)) %>% 
          mutate(education_four = 
                   ifelse(education == "Erhvervsfaglige praktik- og hovedforl?b", 1,
                          ifelse(education == "Almengymnasiale uddannelser" | 
                                   education == "Erhvervsgymnasiale uddannelser", 2, 
                                 ifelse(education == "Forskeruddannelser" | 
                                          education == "Bachelor" |
                                          education == "Korte videreg?ende uddannelser" |
                                          education == "Mellemlange videreg?ende uddannelser" |
                                          education == "Lange videreg?ende uddannelser", 3, 0))),
                 native = IE_TYPE == 1,
                 education_four = ifelse(!is.na(education), education_four, 5))
        
      }
      
      if(year >= 2000){
        data <-
          data %>%
          filter(!is.na(income)) %>% 
          mutate(education_four = 
                   ifelse(education == "Erhvervsfaglige praktik- og hovedforl?b", 1,
                          ifelse(education == "Almengymnasiale uddannelser" | 
                                   education == "Erhvervsgymnasiale uddannelser", 2, 
                                 ifelse(education == "Forskeruddannelser" | 
                                          education == "Bachelor" |
                                          education == "Korte videreg?ende uddannelser" |
                                          education == "Mellemlange videreg?ende uddannelser" |
                                          education == "Lange videreg?ende uddannelser", 3, 0))),
                 native = IE_TYPE == 1,
                 education_four = ifelse(!is.na(education), education_four, 5),
                 sector  = ifelse(is.na(sector), "missing", sector))
        
      }
      
      data$age_groups <- 88
      
      for (i in seq(18, 83, 5)){
        data$age_groups[data$ALDER >= i & data$ALDER < (i + 5)] <- i
      }           
      
      
      for(i in 1:2) {
        for(j in 1:2){
          for(h in 0:1){
            if(j == 1){
              data_sub <- 
                data %>% 
                filter(KOEN == i & 
                         native == h &
                         ALDER < 65) 
            }
            
            if(j == 2){
              data_sub <- 
                data %>% 
                filter(KOEN == i &
                         native == h & 
                         ALDER >= 65) 
            }
            
            data_est <- 
              data_sub %>%
              filter(! PNR %in% candidates) 
            
            
            print("modelling")
            
            
            if(year < 2000){
              lm_res     <- lm(income ~ as.factor(education_four)  * as.factor(age_groups)  + 
                                 as.factor(KOM), data = data_est)
            }
            
            if(year >= 2000){
              lm_res     <- lm(income ~ as.factor(education_four) * as.factor(sector) *
                                 as.factor(age_groups) + as.factor(KOM), data = data_est)  
            }
            
            
            
            pred_income <- predict(object = lm_res, newdata = data_sub)
            
            data_sub <- 
              data_sub %>%
              mutate(res_income = income - pred_income,
                     year = year,
                     res_income = (res_income - mean(res_income)) / sd(res_income))   %>%
              select("PNR", "year", "res_income")
            
            
            data_res <- rbind(data_res, data_sub)
            
            
            
            print(c(year, i, j))
            print(Sys.time())
          }
        }
      }
      
    
  }
  return(data_res)
}
stopCluster(cl)

data_res <- NULL

for(i in 1:cores){
  data_res <- rbind(data_res, m[[i]])
  
  
}

data_comp <- 
  data_res %>% 
  mutate(four_years = floor((year - 1990)/4) * 4 + 1993) %>%
  group_by(four_years, PNR) %>% 
  summarise(inc_res = mean(res_income, na.rm = TRUE))

length(unique(data_comp$PNR))
length(unique(data_res$PNR))
summary(as.vector(data_comp$inc_res))
summary(as.vector(data_res$res_income))

write.fst(data_comp, "data_comp.fst")
write.fst(data_res,  "data_comp_yearly.fst")

